home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyo (Python 2.5)
-
- __revision__ = '$Id: stanza.py 676 2007-08-30 07:29:20Z jajcus $'
- __docformat__ = 'restructuredtext en'
- import libxml2
- import random
- from pyxmpp import xmlextra
- from pyxmpp.utils import from_utf8, to_utf8
- from pyxmpp.jid import JID
- from pyxmpp.xmlextra import common_doc, common_ns, COMMON_NS
- from pyxmpp.exceptions import ProtocolError, JIDMalformedProtocolError
- random.seed()
- last_id = random.randrange(1000000)
-
- def gen_id():
- global last_id
- last_id += 1
- return str(last_id)
-
-
- class Stanza:
- stanza_type = 'Unknown'
-
- def __init__(self, name_or_xmlnode, from_jid = None, to_jid = None, stanza_type = None, stanza_id = None, error = None, error_cond = None, stream = None):
- self._error = None
- self.xmlnode = None
- if isinstance(name_or_xmlnode, Stanza):
- self.xmlnode = name_or_xmlnode.xmlnode.copyNode(1)
- common_doc.addChild(self.xmlnode)
- elif isinstance(name_or_xmlnode, libxml2.xmlNode):
- self.xmlnode = name_or_xmlnode.docCopyNode(common_doc, 1)
- common_doc.addChild(self.xmlnode)
-
- try:
- ns = self.xmlnode.ns()
- except libxml2.treeError:
- ns = None
-
- if not ns or not (ns.name):
- xmlextra.replace_ns(self.xmlnode, ns, common_ns)
-
- else:
- self.xmlnode = common_doc.newChild(common_ns, name_or_xmlnode, None)
- if from_jid is not None:
- if not isinstance(from_jid, JID):
- from_jid = JID(from_jid)
-
- self.xmlnode.setProp('from', from_jid.as_utf8())
-
- if to_jid is not None:
- if not isinstance(to_jid, JID):
- to_jid = JID(to_jid)
-
- self.xmlnode.setProp('to', to_jid.as_utf8())
-
- if stanza_type:
- self.xmlnode.setProp('type', stanza_type)
-
- if stanza_id:
- self.xmlnode.setProp('id', stanza_id)
-
- if self.get_type() == 'error':
- StanzaErrorNode = StanzaErrorNode
- import pyxmpp.error
- if error:
- self._error = StanzaErrorNode(error, parent = self.xmlnode, copy = 1)
- elif error_cond:
- self._error = StanzaErrorNode(error_cond, parent = self.xmlnode)
-
-
- self.stream = stream
-
-
- def __del__(self):
- if self.xmlnode:
- self.free()
-
-
-
- def free(self):
- if self._error:
- self._error.free_borrowed()
-
- self.xmlnode.unlinkNode()
- self.xmlnode.freeNode()
- self.xmlnode = None
-
-
- def copy(self):
- return Stanza(self)
-
-
- def serialize(self):
- return self.xmlnode.serialize(encoding = 'utf-8')
-
-
- def get_node(self):
- return self.xmlnode
-
-
- def get_from(self):
- if self.xmlnode.hasProp('from'):
-
- try:
- return JID(from_utf8(self.xmlnode.prop('from')))
- except JIDError:
- raise JIDMalformedProtocolError, "Bad JID in the 'from' attribute"
- except:
- None<EXCEPTION MATCH>JIDError
-
-
- None<EXCEPTION MATCH>JIDError
- return None
-
- get_from_jid = get_from
-
- def get_to(self):
- if self.xmlnode.hasProp('to'):
-
- try:
- return JID(from_utf8(self.xmlnode.prop('to')))
- except JIDError:
- raise JIDMalformedProtocolError, "Bad JID in the 'to' attribute"
- except:
- None<EXCEPTION MATCH>JIDError
-
-
- None<EXCEPTION MATCH>JIDError
- return None
-
- get_to_jid = get_to
-
- def get_type(self):
- if self.xmlnode.hasProp('type'):
- return from_utf8(self.xmlnode.prop('type'))
- else:
- return None
-
- get_stanza_type = get_type
-
- def get_id(self):
- if self.xmlnode.hasProp('id'):
- return from_utf8(self.xmlnode.prop('id'))
- else:
- return None
-
- get_stanza_id = get_id
-
- def get_error(self):
- if self._error:
- return self._error
-
- n = self.xpath_eval(u'ns:error')
- if not n:
- raise ProtocolError, (None, 'This stanza contains no error: %r' % (self.serialize(),))
-
- StanzaErrorNode = StanzaErrorNode
- import pyxmpp.error
- self._error = StanzaErrorNode(n[0], copy = 0)
- return self._error
-
-
- def set_from(self, from_jid):
- if from_jid:
- return self.xmlnode.setProp('from', JID(from_jid).as_utf8())
- else:
- return self.xmlnode.unsetProp('from')
-
-
- def set_to(self, to_jid):
- if to_jid:
- return self.xmlnode.setProp('to', JID(to_jid).as_utf8())
- else:
- return self.xmlnode.unsetProp('to')
-
-
- def set_type(self, stanza_type):
- if stanza_type:
- return self.xmlnode.setProp('type', to_utf8(stanza_type))
- else:
- return self.xmlnode.unsetProp('type')
-
-
- def set_id(self, stanza_id):
- if stanza_id:
- return self.xmlnode.setProp('id', to_utf8(stanza_id))
- else:
- return self.xmlnode.unsetProp('id')
-
-
- def set_content(self, content):
- while self.xmlnode.children:
- self.xmlnode.children.unlinkNode()
- if hasattr(content, 'as_xml'):
- content.as_xml(parent = self.xmlnode, doc = common_doc)
- elif isinstance(content, libxml2.xmlNode):
- self.xmlnode.addChild(content.docCopyNode(common_doc, 1))
- else:
- self.xmlnode.setContent(content)
-
-
- def add_content(self, content):
- if hasattr(content, 'as_xml'):
- content.as_xml(parent = self.xmlnode, doc = common_doc)
- elif isinstance(content, libxml2.xmlNode):
- self.xmlnode.addChild(content.docCopyNode(common_doc, 1))
- else:
- self.xmlnode.addContent(content)
-
-
- def set_new_content(self, ns_uri, name):
- while self.xmlnode.children:
- self.xmlnode.children.unlinkNode()
- return self.add_new_content(ns_uri, name)
-
-
- def add_new_content(self, ns_uri, name):
- c = self.xmlnode.newChild(None, to_utf8(name), None)
- if ns_uri:
- ns = c.newNs(ns_uri, None)
- c.setNs(ns)
-
- return c
-
-
- def xpath_eval(self, expr, namespaces = None):
- ctxt = common_doc.xpathNewContext()
- ctxt.setContextNode(self.xmlnode)
- ctxt.xpathRegisterNs('ns', COMMON_NS)
- if namespaces:
- for prefix, uri in namespaces.items():
- ctxt.xpathRegisterNs(unicode(prefix), uri)
-
-
- ret = ctxt.xpathEval(unicode(expr))
- ctxt.xpathFreeContext()
- return ret
-
-
- def __eq__(self, other):
- if not isinstance(other, Stanza):
- return False
-
- return self.xmlnode.serialize() == other.xmlnode.serialize()
-
-
- def __ne__(self, other):
- if not isinstance(other, Stanza):
- return True
-
- return self.xmlnode.serialize() != other.xmlnode.serialize()
-
-
-